LIB = libblktap.so libblktap.so.$(MAJOR) libblktap.so.$(MAJOR).$(MINOR)
-all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax
+all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax parallax-threaded blockstored
$(MAKE) $(LIB)
LINUX_ROOT := $(wildcard $(XEN_ROOT)/linux-2.6.*-xen-sparse)
$(CC) $(CFLAGS) -o blkaio -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap blkaio.c blkaiolib.c -laio -lpthread
parallax: $(LIB) $(PLX_SRCS)
- $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap $(PLX_SRCS) libgnbd/libgnbd.a
+ $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap -lpthread $(PLX_SRCS) libgnbd/libgnbd.a
parallax-threaded: $(LIB) $(PLXT_SRCS)
$(CC) $(CFLAGS) -o parallax-threaded -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lpthread -lblktap $(PLXT_SRCS) libgnbd/libgnbd.a
vdi_test: $(LIB) $(VDI_SRCS)
- $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE $(VDI_SRCS)
+ $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE -lpthread $(VDI_SRCS)
vdi_list: $(LIB) vdi_list.c $(VDI_SRCS)
- $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c $(VDI_SRCS)
+ $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c -lpthread $(VDI_SRCS)
vdi_create: $(LIB) vdi_create.c $(VDI_SRCS)
- $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c $(VDI_SRCS)
+ $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c -lpthread $(VDI_SRCS)
vdi_snap: $(LIB) vdi_snap.c $(VDI_SRCS)
- $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c $(VDI_SRCS)
+ $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c -lpthread $(VDI_SRCS)
vdi_snap_list: $(LIB) vdi_snap_list.c $(VDI_SRCS)
- $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c $(VDI_SRCS)
+ $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c -lpthread $(VDI_SRCS)
vdi_snap_delete: $(LIB) vdi_snap_delete.c $(VDI_SRCS)
- $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c $(VDI_SRCS)
+ $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c -lpthread $(VDI_SRCS)
vdi_tree: $(LIB) vdi_tree.c $(VDI_SRCS)
- $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c $(VDI_SRCS)
+ $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c -lpthread $(VDI_SRCS)
vdi_fill: $(LIB) vdi_fill.c $(VDI_SRCS)
- $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c $(VDI_SRCS)
+ $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c -lpthread $(VDI_SRCS)
vdi_validate: $(LIB) vdi_validate.c $(VDI_SRCS)
- $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c $(VDI_SRCS)
+ $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c -lpthread $(VDI_SRCS)
blockstored: blockstored.c
- $(CC) $(CFLAGS) -g3 -o blockstored blockstored.c
+ $(CC) $(CFLAGS) -g3 -o blockstored -lpthread blockstored.c
bstest: bstest.c blockstore.c
- $(CC) $(CFLAGS) -g3 -o bstest bstest.c blockstore.c
+ $(CC) $(CFLAGS) -g3 -o bstest bstest.c -lpthread blockstore.c
.PHONY: TAGS clean install mk-symlinks rpm
TAGS:
}
}
+static pthread_mutex_t push_mutex = PTHREAD_MUTEX_INITIALIZER;
+
void blktap_inject_response(blkif_response_t *rsp)
{
+
apply_rsp_hooks(rsp);
+
write_rsp_to_fe_ring(rsp);
+
+ pthread_mutex_lock(&push_mutex);
+
RING_PUSH_RESPONSES(&fe_ring);
ioctl(fd, BLKTAP_IOCTL_KICK_FE);
+
+ pthread_mutex_unlock(&push_mutex);
}
/*-----[ Polling fd listeners ]------------------------------------------*/
}
/* Using this as a unidirectional ring. */
ctrl_ring.req_cons = ctrl_ring.rsp_prod_pvt = i;
+pthread_mutex_lock(&push_mutex);
RING_PUSH_RESPONSES(&ctrl_ring);
+pthread_mutex_unlock(&push_mutex);
/* empty the fe_ring */
notify_fe = 0;
if (notify_be) {
DPRINTF("notifying be\n");
+pthread_mutex_lock(&push_mutex);
RING_PUSH_REQUESTS(&be_ring);
ioctl(fd, BLKTAP_IOCTL_KICK_BE);
+pthread_mutex_unlock(&push_mutex);
}
if (notify_fe) {
DPRINTF("notifying fe\n");
+pthread_mutex_lock(&push_mutex);
RING_PUSH_RESPONSES(&fe_ring);
ioctl(fd, BLKTAP_IOCTL_KICK_FE);
+pthread_mutex_unlock(&push_mutex);
}
}
}
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <sys/time.h>
#include <stdarg.h>
#include "blockstore.h"
#include <pthread.h>
#include "parallax-threaded.h"
#define BLOCKSTORE_REMOTE
-#define BSDEBUG
+//#define BSDEBUG
+
+#define RETRY_TIMEOUT 1000000 /* microseconds */
/*****************************************************************************
* Debugging
struct sockaddr_in sin_local;
int bssock = 0;
+/*****************************************************************************
+ * Notification *
+ *****************************************************************************/
+
+typedef struct pool_thread_t_struct {
+ pthread_mutex_t ptmutex;
+ pthread_cond_t ptcv;
+ int newdata;
+} pool_thread_t;
+
+pool_thread_t pool_thread[READ_POOL_SIZE+1];
+
+#define RECV_NOTIFY(tid) { \
+ pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \
+ pool_thread[tid].newdata = 1; \
+ DB("CV Waking %u", tid); \
+ pthread_cond_signal(&(pool_thread[tid].ptcv)); \
+ pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); }
+#define RECV_AWAIT(tid) { \
+ pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \
+ if (pool_thread[tid].newdata) { \
+ pool_thread[tid].newdata = 0; \
+ DB("CV Woken %u", tid); \
+ } \
+ else { \
+ DB("CV Waiting %u", tid); \
+ pthread_cond_wait(&(pool_thread[tid].ptcv), \
+ &(pool_thread[tid].ptmutex)); \
+ } \
+ pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); }
+
/*****************************************************************************
* Message queue management *
*****************************************************************************/
#define ENTER_RECV_CR pthread_mutex_lock(&ptmutex_recv)
#define LEAVE_RECV_CR pthread_mutex_unlock(&ptmutex_recv)
-int notify = 0;
-pthread_mutex_t ptmutex_notify;
-pthread_cond_t ptcv_notify;
-#define RECV_NOTIFY { \
- pthread_mutex_lock(&ptmutex_notify); \
- notify = 1; \
- pthread_cond_signal(&ptcv_notify); \
- pthread_mutex_unlock(&ptmutex_notify); }
-#define RECV_AWAIT { \
- pthread_mutex_lock(&ptmutex_notify); \
- if (notify) \
- notify = 0; \
- else \
- pthread_cond_wait(&ptcv_notify, &ptmutex_notify); \
- pthread_mutex_unlock(&ptmutex_notify); }
-
-
/* A message queue entry. We allocate one of these for every request we send.
* Asynchronous reply reception also used one of these.
*/
int length;
struct msghdr msghdr;
struct iovec iov[2];
+ int tid;
+ struct timeval tv_sent;
bshdr_t message;
void *block;
} bsq_t;
qe->message.luid = new_luid();
qe->status = 0;
+ qe->tid = (int)pthread_getspecific(tid_key);
if (enqueue(qe) < 0) {
fprintf(stderr, "Error enqueuing request.\n");
return -1;
}
+ gettimeofday(&(qe->tv_sent), NULL);
DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid);
rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
//rc = sendto(bssock, (void *)&(qe->message), qe->length, 0,
int wait_recv(bsq_t **reqs, int numreqs) {
bsq_t *q, *m;
unsigned int x, i;
+ int tid = (int)pthread_getspecific(tid_key);
DB("ENTER wait_recv %u\n", numreqs);
return numreqs;
}
- RECV_AWAIT;
+ RECV_AWAIT(tid);
/*
rxagain:
}
+/* retry
+ */
+static int retry_count = 0;
+int retry(bsq_t *qe)
+{
+ int rc;
+ gettimeofday(&(qe->tv_sent), NULL);
+ DB("retry to %d luid=%016llx\n", qe->server, qe->message.luid);
+ retry_count++;
+ rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
+ if (rc < 0)
+ return rc;
+ return 0;
+}
+
+/* queue runner
+ */
+void *queue_runner(void *arg)
+{
+ for (;;) {
+ struct timeval now;
+ long long nowus, sus;
+ bsq_t *q;
+ int r;
+
+ sleep(1);
+
+ gettimeofday(&now, NULL);
+ nowus = now.tv_usec + now.tv_sec * 1000000;
+ ENTER_QUEUE_CR;
+ r = retry_count;
+ for (q = bs_head; q; q = q->next) {
+ sus = q->tv_sent.tv_usec + q->tv_sent.tv_sec * 1000000;
+ if ((nowus - sus) > RETRY_TIMEOUT) {
+ if (retry(q) < 0) {
+ fprintf(stderr, "Error on sendmsg retry.\n");
+ }
+ }
+ }
+ if (r != retry_count) {
+ fprintf(stderr, "RETRIES: %u %u\n", retry_count - r, retry_count);
+ }
+ LEAVE_QUEUE_CR;
+ }
+}
+
/* receive loop
*/
void *receive_loop(void *arg)
}
else {
DB("RX MATCH");
- RECV_NOTIFY;
+ RECV_NOTIFY(m->tid);
}
}
}
pthread_mutex_init(&ptmutex_queue, NULL);
pthread_mutex_init(&ptmutex_luid, NULL);
pthread_mutex_init(&ptmutex_recv, NULL);
- pthread_mutex_init(&ptmutex_notify, NULL);
- pthread_cond_init(&ptcv_notify, NULL);
+ /*pthread_mutex_init(&ptmutex_notify, NULL);*/
+ for (i = 0; i <= READ_POOL_SIZE; i++) {
+ pool_thread[i].newdata = 0;
+ pthread_mutex_init(&(pool_thread[i].ptmutex), NULL);
+ pthread_cond_init(&(pool_thread[i].ptcv), NULL);
+ }
bsservers[0].hostname = "firebug.cl.cam.ac.uk";
bsservers[1].hostname = "planb.cl.cam.ac.uk";
}
pthread_create(&pthread_recv, NULL, receive_loop, NULL);
+ pthread_create(&pthread_recv, NULL, queue_runner, NULL);
#else /* /BLOCKSTORE_REMOTE */
block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644);
void __exit_blockstore(void)
{
+ int i;
pthread_mutex_destroy(&ptmutex_recv);
pthread_mutex_destroy(&ptmutex_luid);
pthread_mutex_destroy(&ptmutex_queue);
- pthread_mutex_destroy(&ptmutex_notify);
- pthread_cond_destroy(&ptcv_notify);
+ /*pthread_mutex_destroy(&ptmutex_notify);
+ pthread_cond_destroy(&ptcv_notify);*/
+ for (i = 0; i <= READ_POOL_SIZE; i++) {
+ pthread_mutex_destroy(&(pool_thread[i].ptmutex));
+ pthread_cond_destroy(&(pool_thread[i].ptcv));
+ }
}